rabbitmq 远程过程调用RPC

前言

本篇通过rabbitmq来实现RPC远程过程调用模型。

实现原理

数据扭转图

  1. rpc_queue请求队列,存放请求的queue;
  2. reply_to响应队列,存放响应结果的queue;
  3. correlationId则是将请求Request和响应Response相互关联起来的标识;

RPCClient

public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName = "replyTo_queue";

public RPCClient() throws IOException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("test");
connectionFactory.setPassword("1234");
connectionFactory.setVirtualHost("/test");
connectionFactory.setAutomaticRecoveryEnabled(true);

connection = factory.newConnection();
channel = connection.createChannel();
}

public String call(String message) throws IOException, InterruptedException {
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId) // 请求唯一标识
.replyTo(replyQueueName) // 客户端指定响应队列
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 通过唯一标识correlationId关联请求-响应
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take(); // 阻塞等待值返回
}

public void close() throws IOException {
connection.close();
}
}

RPCServer

public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] argv) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("test");
connectionFactory.setPassword("1234");
connectionFactory.setVirtualHost("/test");
connectionFactory.setAutomaticRecoveryEnabled(true);

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false,null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
String response = "";
try {
String message = new String(body,"UTF-8");
System.out.println("request(" + message + ")");
response = "response-result-" + message;
} catch (Exception e) {
System.out.println(e.toString());
} finally {
// 将结果投递到properties.getReplyTo()响应队列
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
}
}

总结

上述RPCClient和RPCServer案例中,存在一个问题,但是出现的可能性并不大;由于RPCServer是先返回响应队列结果,然后才向rpc_queue队列确认消息处理成功的,在这个过程中如果前一步执行成功了,但是后一步由于broker突然宕机导致没有进行消息确认ack处理,当broker重新启动,就会出现之前已经处理的消息重新处理。这需要RPCClient和RPCServer做好服务的幂等性。

参考链接

  1. http://www.rabbitmq.com/tutorials/tutorial-six-java.html
  2. http://www.rabbitmq.com/direct-reply-to.html